Kafka Producer API

以下适用于版本2.0.x。

public class KafkaProducer<K,V> extends java.lang.Object implements Producer<K,V>

线程安全

为了提高效率,建议实现为单例。

版本>=0.10,客户端可以与broker通信。版本不一致抛出UnsupportedVersionException。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// 控制请求完成的标准。all表示记录完整提交前阻塞,最慢但是最持久。
props.put("acks", "all");
// 失败重试次数,详见http://kafka.apache.org/documentation.html#semantics
props.put("retries", 0);
// 每个分区的批处理记录数。越大批处理容量越大,但内存占用越多,因为每个分区都有一个。
props.put("batch.size", 16384);
// 整合请求的时间窗口大小(ms)。将时间窗口内的请求合并为一个请求,可以减少请求数量,提高效率,但是增大延迟。
// 在请求过多时,会忽略该设置,合并相近的请求
props.put("linger.ms", 1);
// 总的声称者缓冲区大小
// 缓冲用尽后,阻塞发送。阻塞max.block.ms时间后,抛出TimeoutException
props.put("buffer.memory", 33554432);
// 以下分别设置键名和键值的序列化方式
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
// send()是异步发送的
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
}

// 后台IO线程使用缓冲区缓存尚未传输的记录。生成关闭失败,可能导致内存泄漏。
producer.close();

版本>=0.11,Kafka支持两种Producer:幂等和事务。

  • 幂等

    幂等生产者加强了Kafka的传递语义。即使重试,也只传递仅一次。

    通过enable.idempotence配置。自动将重试设为Integer.MAX_VALUE,将请求完成标准acks设置为all。无需其他额外配置。

    为了实现幂等语义(仅传递一次),需要避免应用级别的重发。建议不要设置重试次数。如果在重试次数无数的情况下,依旧抛出异常,建议关闭生产者,并检查最近生产的消息是否重复。

    只在单个会话中保证幂等。

  • 事务

    事务允许发送消息给多个分区或主题。

    事务API是阻塞的,失败时抛出异常。

    transactional.id用于在单一生产者的多个会话中事务恢复。用于分区、有状态的应用间唯一标识生产者。设置后幂等生产者相关的配置也被设置。此外,关联的主题应该设置持久化配置。如replication.factor为3,min.insync.replicas为2。为了实现端到端的事务保证,消费者应该只读取提交的消息。

    示例如下,与上述示例类似,但所有的100条消息都在一个事务中。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("transactional.id", "my-transactional-id");
    Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

    producer.initTransactions();

    try {
    producer.beginTransaction();
    for (int i = 0; i < 100; i++)
    producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
    producer.commitTransaction();
    } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    // We can't recover from these exceptions, so our only option is to close the producer and exit.
    producer.close();
    } catch (KafkaException e) {
    // For all other exceptions, just abort the transaction and try again.
    // 标记已成功的写入为aborted
    producer.abortTransaction();
    }
    producer.close();

    注意:

    同一时刻,只能有一个开放的事务。

    beginTransaction()commitTransaction()间的消息属于同一事务。

    transactional.id设置后,发送的消息必须属于某个事务。

    事务过程中的异常直接抛出,不必调用Future或回调函数。

参考资料

Class KafkaProducer

Develop a Java Application With Kafka

Kafka Java Client